package com.ihcy.mq.rocket.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;

import java.io.UnsupportedEncodingException;

@Slf4j
public abstract class AbstractMQConsumer implements CommandLineRunner {
    @Autowired
    private MQConsumerConfiguration mqConsumerConfiguration;

    @Override
    public void run(String... strings) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
                mqConsumerConfiguration.getGroupName() + this.getMQName());
        consumer.setNamesrvAddr(mqConsumerConfiguration.getNameServerAddr());
        consumer.subscribe("test normal topic", "*");
        // 如果是第一次启动，从队列头部开始消费
        // 如果不是第一次启动，从上次消费的位置继续消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 一次性消费的数量，默认1
        consumer.setConsumeMessageBatchMaxSize(mqConsumerConfiguration.getConsumeMessageBatchMaxSize());
        consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
            if (CollectionUtils.isEmpty(list)) {
                log.info("{}接受到的消息为空，不处理，直接返回成功", getMQName());
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            log.info("{} consumer start", getMQName());
            try {
                for (MessageExt messageExt : list) {
                    // 获取该消息重试次数
                    int reconsume = messageExt.getReconsumeTimes();
//                    if (reconsume == 3) {
//                        //消息已经重试了3次，如果不需要再次消费
//                        continue;
//                    }
                    execute(messageExt);
                }
                log.info("{} consumer end", getMQName());
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (Exception e) {
                log.info("{} consumer fail {}", getMQName(), e.getMessage());
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }


        });
        consumer.start();
        log.info("[{} Consumer 已启动]", this.getMQName());
    }

    /**
     * MQ 名称
     */
    public abstract String getMQName();

    /**
     * MQ 消费队列
     */
    public abstract Pair<String, String> getTopic();

    /**
     * MQ 具体执行
     */
    public abstract void execute(MessageExt messageExt) throws UnsupportedEncodingException;

}